package xiaoa.java.jms.activeMQ;

import java.io.IOException;



import java.util.ArrayList;

import java.util.List;

import javax.jms.Connection;

import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.transport.TransportListener;

import xiaoa.java.log.L;

/**
 * 服务Test
 * @author xiaoa
 * @date 2017年3月17日 下午1:39:37
 * @version V1.0
 *
 */
public class ServerTest {
	
	//  创建一个jms连接工厂
	private static ActiveMQConnectionFactory    connectionFactory = null;
	
    // Connection ：JMS 客户端到JMS Provider 的连接
	private static Connection connection = null;
	
	private final static String url = "failover:(tcp://192.168.218.138:61616?wireFormat.maxInactivityDuration=0,tcp://192.168.218.1:61616?wireFormat.maxInactivityDuration=0)"
			+ "?initialReconnectDelay=1000&jms.prefetchPolicy.all=100?maxReconnectDelay=30000000&maxReconnectAttempts=-1&warnAfterReconnectAttempts=0";

	
	private static final String QUEUE_NAME = "xiaoaQueue";
    
	/**
	 * 初始化
	 * @Title: init
	 * @throws Throwable
	 * @author xiaoa
	 */
    public  static void init () throws Throwable{
    	
        try {
        	
        	L.info(url);
        	
        	 // 创建连接工厂
            connectionFactory = new ActiveMQConnectionFactory(url);
            
    		// 构造从工厂得到连接对象
    		connection       = connectionFactory.createConnection();
    		
    		if (connection instanceof ActiveMQConnection){
    			
    			
    			L.info("============= 添加监听");
    			ActiveMQConnection   activeMQConnection  = (ActiveMQConnection)connection;
    			
    			
    			activeMQConnection.addTransportListener(new TransportListener() {
					
					@Override
					public void transportResumed() {
						System.out.println("===>> 重连");
						
					}
					
					@Override
					public void transportInterupted() {
						System.out.println("===>> 断开");
					}
					
					@Override
					public void onException(IOException error) {
						
						System.out.println("===>> 出错 error = " + error);

					}
					
					@Override
					public void onCommand(Object command) {
						System.out.println("===>>  提交  ： " + command.toString());
						
					}
				});
    			
    		}
    		
    		
    		connection.start();
    	
	    	
    	    // 创建一个消费者
    	    
    	    for (int i = 0; i < 1 ; i ++){
    	    	
    	    	final int consumerIndex = i;
    	    	
    	    	// Session： 一个发送或接收消息的线程
        	    Session session  = connection.createSession(Boolean.FALSE,Session.CLIENT_ACKNOWLEDGE);
    	    	
    	    	// 创建一个消费者
    	    	MessageConsumer  consumer = session.createConsumer(session.createQueue(QUEUE_NAME));
    	    	
				L.info("==================== 消费者 : " + consumerIndex  + " 等待");
				
				// 创建一个错误队列
				final List<String>  errorIdList  = new ArrayList<String>();
				
//				errorIdList.add("ID:xiaoa-3516-1489887706969-0:1:1:1:5");
//				errorIdList.add("ID:xiaoa-3516-1489887706969-0:1:1:1:6");
//				errorIdList.add("ID:xiaoa-3516-1489887706969-0:1:1:1:7");

				
				// 设置一个监听
    	    	consumer.setMessageListener( new MessageListener() {
					
					@Override
					public void onMessage(Message message) {
						
						try {
							String threadid = Thread.currentThread().toString();
							
							L.info("消费者 threadid = " + threadid + "  messageId = " + message.getJMSMessageID() + "  consumerIndex = " + consumerIndex + "==================  ");
							
							if (message.getJMSRedelivered()){
								L.info("消息被重发" + message.getJMSMessageID());
							//	System.exit(1);
							}
							
							if (errorIdList.contains(message.getJMSMessageID())){
								throw new RuntimeException(" messageId = " + message.getJMSMessageID());
							}
		
							message.acknowledge();
							//Thread.sleep(1 * 1000);

							
						} catch (Exception e) {
							e.printStackTrace();
							throw new RuntimeException(e);
						}
						
						
					}
				});

    	    }
          
    	    System.out.println("完成");
    	    
        } catch (Exception e) {
            e.printStackTrace();
            if (null != connection){
                connection.close();
            }
        } 
        
    }
    
    
    public static void main(String[] args) throws  Throwable {
		init();
	}

}
